连接池介绍 psycopg2的连接池非常的简单,源码也没几行。官网介绍见。源码如下: import psycopg2 from psycopg2 import extensions as _ext class PoolError(psycopg2.Error): pass class AbstractConnectionPool(object): """Generic key-based pooling code.""" def __init__(self, minconn, maxconn, *args, **kwargs): """Initialize the connection pool. New 'minconn' connections are created immediately calling 'connfunc' with given parameters. The connection pool will support a maximum of about 'maxconn' connections. """ self.minconn = int(minconn) self.maxconn = int(maxconn) self.closed = False self._args = args self._kwargs = kwargs self._pool = [] self._used = {} self._rused = {} # id(conn) -> key map self._keys = 0 for i in range(self.minconn): self._connect() def _connect(self, key=None): """Create a new connection and assign it to 'key' if not None.""" conn = psycopg2.connect(*self._args, **self._kwargs) if key is not None: self._used[key] = conn self._rused[id(conn)] = key else: self._pool.append(conn) return conn def _getkey(self): """Return a new unique key.""" self._keys += 1 return self._keys def _getconn(self, key=None): """Get a free connection and assign it to 'key' if not None.""" if self.closed: raise PoolError("connection pool is closed") if key is None: key = self._getkey() if key in self._used: return self._used[key] if self._pool: self._used[key] = conn = self._pool.pop() self._rused[id(conn)] = key return conn else: if len(self._used) == self.maxconn: raise PoolError("connection pool exhausted") return self._connect(key) def _putconn(self, conn, key=None, close=False): """Put away a connection.""" if self.closed: raise PoolError("connection pool is closed") if key is None: key = self._rused.get(id(conn)) if key is None: raise PoolError("trying to put unkeyed connection") if len(self._pool) < self.minconn and not close: # Return the connection into a consistent state before putting # it back into the pool if not conn.closed: status = if status == _ext.TRANSACTION_STATUS_UNKNOWN: # server connection lost conn.close() elif status != _ext.TRANSACTION_STATUS_IDLE: # connection in error or in transaction conn.rollback() self._pool.append(conn) else: # regular idle connection self._pool.append(conn) # If the connection is closed, we just discard it. else: conn.close() # here we check for the presence of key because it can happen that a # thread tries to put back a connection after a call to close if not self.closed or key in self._used: del self._used[key] del self._rused[id(conn)] def _closeall(self): """Close all connections. Note that this can lead to some code fail badly when trying to use an already closed connection. If you call .closeall() make sure your code can deal with it. """ if self.closed: raise PoolError("connection pool is closed") for conn in self._pool + list(self._used.values()): try: conn.close() except Exception: pass self.closed = True class SimpleConnectionPool(AbstractConnectionPool): """A connection pool that can't be shared across different threads.""" getconn = AbstractConnectionPool._getconn putconn = AbstractConnectionPool._putconn closeall = AbstractConnectionPool._closeall class ThreadedConnectionPool(AbstractConnectionPool): """A connection pool that works with the threading module.""" def __init__(self, minconn, maxconn, *args, **kwargs): """Initialize the threading lock.""" import threading AbstractConnectionPool.__init__( self, minconn, maxconn, *args, **kwargs) self._lock = threading.Lock() def getconn(self, key=None): """Get a free connection and assign it to 'key' if not None.""" self._lock.acquire() try: return self._getconn(key) finally: self._lock.release() def putconn(self, conn=None, key=None, close=False): """Put away an unused connection.""" self._lock.acquire() try: self._putconn(conn, key, close) finally: self._lock.release() def closeall(self): """Close all connections (even the one currently in use.)""" self._lock.acquire() try: self._closeall() finally: self._lock.release()

AbstractConnectionPool是一个基类,SimpleConnectionPool是基类的简单实现,ThreadedConnectionPool是线程安全的实现。从上面图中也能看出 ThreadedConnectionPool 类有_lock是 threading.Lock() 实例,它控制同一时间只允许一个线程操作连接池来get、put、close连接操作。

报错信息解释 "connection pool exhausted"


"trying to put unkeyed connection"

通过阅读源码,能够看出报该错的条件。getconn方法得到连接的同时会把连接记录到 _used 和 _rused 中。putconn方法不是把连接close了,而是把连接放回了池中(可以先简单的这么认为)。如果调用putconn方法时在 _rused 中没有找到记录,说明这个连接并没有“正在使用中”,这个连接可能已经close了也可能已经放回池中了(放回 _pool 队列中了)。试图关闭一个没有在使用中的连接就报错了。


连接池代码样例 from contextlib import contextmanager from threading import Semaphore import psycopg2 from psycopg2 import pool, extensions pgsql_config = { 'user': '***', 'password': '***', 'host': '***', 'port': '***', 'database': '***' } class ReallyThreadedConnectionPool(psycopg2.pool.ThreadedConnectionPool): def __init__(self, minconn, maxconn, *args, **kwargs): self._semaphore = Semaphore(maxconn) super().__init__(minconn, maxconn, *args, **kwargs) def getconn(self, key=None): self._semaphore.acquire() return super().getconn(key) def putconn(self, *args, **kwargs): super().putconn(*args, **kwargs) self._semaphore.release() cnxpool = ReallyThreadedConnectionPool(5, 10, **pgsql_config) @contextmanager def get_cursor(): try: con = cnxpool.getconn() cursor = con.cursor() yield cursor except psycopg2.Error as e: print(e) finally: cursor.close() cnxpool.putconn(con) class PyPgsql(object): @staticmethod def get_all(sql): with get_cursor() as cursor: cursor.execute(sql) return cursor.fetchall() if __name__ == '__main__': import time from concurrent.futures import ThreadPoolExecutor def t(n): r = PyPgsql.get_all("select * from TABLE") print(r) s = time.time() with ThreadPoolExecutor(max_workers=15) as pool: for i in range(20): pool.submit(t, (i)) e = time.time() print(e - s)






